package com.ruoyi.rocketMq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 死信队列消费
 * @author cyh
 * @date 2022/7/8 2:59 下午
 */
public class DLQConsumer {

    public static void main(String[] args) throws MQClientException {
        // 定义一个push消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("%DLQ%cgQ");
        // 指定nameServer
        consumer.setNamesrvAddr("192.168.1.179:9876");
        // 指定从第一条消息开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定消费topic与tag
        consumer.subscribe("LearnTopicQ", "*");
        consumer.setMaxReconsumeTimes(1);

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                // 逐条消费消息
                for (MessageExt msg : msgs) {
                    System.out.println("死信收到消息：" + msg);
                }
                if (1 > 0) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                // 返回消费状态：消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者消费
        consumer.start();
        System.out.println("Consumer Started");
    }
}
